企业级Flink SQL实时数仓

生产落地完整教程

V3.1 | Flink On YARN Application + Paimon 流批一体 | 2026-06-08

文档说明

定位:无删减、无跳过、可零基础复刻、可直接上线的大厂生产级实时数仓全套文档。
版本兼容性:Flink CDC 2.4.0 + Paimon 0.8.2 + Flink 1.18.0;ClickHouse JDBC Connector 1.18.0 + ClickHouse 22.8.x

一、整体企业级架构详解

1.1 架构数据流

数据流架构图

1.2 生产硬性规范

1.3 整体架构拓扑图

整体架构拓扑图

二、超详细服务器基础环境搭建

2.1 集群节点规划

节点IP主机名部署组件
192.168.73.205hadoop102Hadoop NN、ZK、Kafka、Flink JM
192.168.73.206hadoop103Hadoop DN、ZK、Kafka、Flink TM
192.168.73.207hadoop104Hadoop DN、ZK、Kafka、Flink TM
192.168.130.145-MySQL8.0、ClickHouse22.8
资源基线:JM 4G / TM 8G x 4slot / 并行度4,单作业约 5万条/秒 CDC 吞吐。

2.2 全节点基础优化配置

注意:以下 2.2.1 ~ 2.2.5 所有操作需在 hadoop102、hadoop103、hadoop104 三台节点上分别执行

2.2.1 关闭防火墙、SELinux(全节点执行)

systemctl stop firewalld
systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config

2.2.2 主机名与hosts映射(全节点执行)

hostnamectl set-hostname hadoop102  # 各节点分别执行
cat >> /etc/hosts << EOF
192.168.73.205 hadoop102
192.168.73.206 hadoop103
192.168.73.207 hadoop104
EOF

2.2.3 创建专属运行用户(全节点执行)

useradd flink
passwd flink
usermod -aG wheel flink
# /opt/software  - 安装包存放目录(tar.gz/rpm包)
# /opt/bigdata   - 大数据组件安装目录(解压后的程序)
# /data/hadoop   - Hadoop NameNode/DataNode 元数据与数据块
# /data/zookeeper - ZooKeeper 快照与事务日志
# /data/kafka    - Kafka 消息日志(Segment文件)
# /data/flink    - Flink RocksDB 状态本地存储
mkdir -p /opt/software /opt/bigdata /data/hadoop /data/zookeeper /data/kafka /data/flink
chown -R flink:flink /opt/software /opt/bigdata /data/

2.2.4 配置免密登录(全节点执行)

su - flink
ssh-keygen
ssh-copy-id flink@hadoop102
ssh-copy-id flink@hadoop103
ssh-copy-id flink@hadoop104

2.2.5 全节点安装JDK1.8(全节点执行)

yum remove -y openjdk*
tar -zxvf /opt/software/jdk-8u381-linux-x64.tar.gz -C /opt/bigdata/
cat >> /etc/profile << EOF
export JAVA_HOME=/opt/bigdata/jdk1.8.0_381
export JRE_HOME=\$JAVA_HOME/jre
export CLASSPATH=.:\$JAVA_HOME/lib:\$JRE_HOME/lib
export PATH=\$JAVA_HOME/bin:\$PATH
EOF
source /etc/profile
java -version

三、大数据组件集群超详细搭建

3.1 Hadoop3.3.6 集群搭建

tar -zxvf /opt/software/hadoop-3.3.6.tar.gz -C /opt/bigdata/
chown -R flink:flink /opt/bigdata/hadoop-3.3.6

core-site.xml

<configuration>
    <property><name>fs.defaultFS</name><value>hdfs://hadoop102:9000</value></property>
    <property><name>hadoop.tmp.dir</name><value>/data/hadoop/tmp</value></property>
    <property><name>hadoop.proxyuser.flink.hosts</name><value>*</value></property>
    <property><name>hadoop.proxyuser.flink.groups</name><value>*</value></property>
</configuration>

core-site.xml 参数说明

参数说明
fs.defaultFSHDFS NameNode 地址,所有组件通过此地址访问 HDFS
hadoop.tmp.dirHadoop 临时数据目录,必须自定义,否则重启后数据丢失
hadoop.proxyuser.*代理用户配置,Flink on YARN 必需

hdfs-site.xml

<configuration>
    <property><name>dfs.replication</name><value>2</value></property>
    <property><name>dfs.namenode.secondary.http-address</name><value>hadoop103:50090</value></property>
</configuration>

hdfs-site.xml 参数说明

参数说明
dfs.replication数据块副本数。3节点集群设为2,5+节点建议设为3
dfs.namenode.secondary.http-addressSecondary NameNode 地址,定期合并 FsImage 和 EditLog

yarn-site.xml

<configuration>
    <property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
    <property><name>yarn.resourcemanager.hostname</name><value>hadoop102</value></property>
    <property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>
    <property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property>
</configuration>

yarn-site.xml 参数说明

参数说明
yarn.nodemanager.aux-servicesNodeManager 辅助服务,MR 框架所需的数据交换通道
yarn.resourcemanager.hostnameResourceManager 主机名,集群全局唯一的资源调度中心
yarn.nodemanager.vmem-check-enabled设为 false 防止 Flink TM 因虚拟内存超额被 YARN 杀掉
yarn.nodemanager.pmem-check-enabled设为 false 同理,避免容器被意外 kill

workers: hadoop102 / hadoop103 / hadoop104

scp -r /opt/bigdata/hadoop-3.3.6 flink@hadoop103:/opt/bigdata/
scp -r /opt/bigdata/hadoop-3.3.6 flink@hadoop104:/opt/bigdata/
hdfs namenode -format  # 仅首次执行!
start-dfs.sh
start-yarn.sh
jps

3.2 ZooKeeper3.7.1 集群搭建

tar -zxvf /opt/software/apache-zookeeper-3.7.1-bin.tar.gz -C /opt/bigdata/
cp /opt/bigdata/apache-zookeeper-3.7.1-bin/conf/zoo_sample.cfg /opt/bigdata/apache-zookeeper-3.7.1-bin/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=hadoop102:2888:3888
server.2=hadoop103:2888:3888
server.3=hadoop104:2888:3888

zoo.cfg 参数说明

参数说明
tickTime基本时间单元(毫秒),ZK 中所有超时都以此为基准
initLimitFollower 初始连接 Leader 的超时倍数。10 x 2s = 20秒
syncLimitLeader 与 Follower 间心跳超时倍数。5 x 2s = 10秒
dataDirZK 数据快照和事务日志存储路径
clientPort客户端连接端口,Flink HA、Kafka Discovery 都通过此端口
server.N集群成员列表。2888是Leader选举端口,3888是数据同步端口
echo 1 > /data/zookeeper/myid  # hadoop102
echo 2 > /data/zookeeper/myid  # hadoop103
echo 3 > /data/zookeeper/myid  # hadoop104
scp -r /opt/bigdata/apache-zookeeper-3.7.1-bin flink@hadoop103:/opt/bigdata/
scp -r /opt/bigdata/apache-zookeeper-3.7.1-bin flink@hadoop104:/opt/bigdata/
zkServer.sh start  # 所有节点
zkServer.sh status

3.3 Kafka3.2.3 集群搭建

tar -zxvf /opt/software/kafka_2.12-3.2.3.tgz -C /opt/bigdata/
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.73.205:9092
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
num.partitions=3
default.replication.factor=2
log.dirs=/data/kafka/logs

Kafka server.properties 参数说明

参数说明
broker.id集群中每个 Broker 的唯一标识,三节点必须不同
listeners监听地址。0.0.0.0 绑定所有网卡
advertised.listeners对外广播地址,必须是客户端可达的 IP
zookeeper.connectZK 集群地址,Kafka 用 ZK 存储元数据
num.partitions新建 Topic 默认分区数
default.replication.factor默认副本数。设为2保证单节点故障不丢数据
log.dirsKafka 消息日志存储路径,建议用独立磁盘
kafka-server-start.sh -daemon /opt/bigdata/kafka_2.12-3.2.3/config/server.properties

3.4 MySQL8.0 搭建

yum remove -y mariadb*
yum install -y mysql-server
systemctl start mysqld
systemctl enable mysqld
grep 'temporary password' /var/log/mysqld.log
mysql -uroot -p
ALTER USER 'root'@'localhost' IDENTIFIED BY 'YourStrongPassword@2026';
CREATE USER 'flink'@'192.168.73.%' IDENTIFIED BY 'YourStrongPassword@2026';
GRANT SELECT, INSERT, UPDATE, DELETE ON biz_db.* TO 'flink'@'192.168.73.%';
GRANT SELECT, INSERT, UPDATE, DELETE ON ads_db.* TO 'flink'@'192.168.73.%';
FLUSH PRIVILEGES;
CREATE DATABASE biz_db;
CREATE DATABASE ads_db;
安全警告:密码仅为示例,生产环境必须使用16位以上强密码。MySQL 必须开启 binlog(binlog_format=ROW),否则 CDC 无法工作。

3.5 ClickHouse22.8 离线部署

# 有网络的机器上提前下载 RPM 包
wget https://github.com/ClickHouse/ClickHouse/releases/download/v22.8.20.22-lts/clickhouse-server-22.8.20.22-1.el8.x86_64.rpm
wget https://github.com/ClickHouse/ClickHouse/releases/download/v22.8.20.22-lts/clickhouse-client-22.8.20.22-1.el8.x86_64.rpm
wget https://github.com/ClickHouse/ClickHouse/releases/download/v22.8.20.22-lts/clickhouse-common-static-22.8.20.22-1.el8.x86_64.rpm
# 拷贝至生产服务器后执行
yum localinstall -y /opt/software/clickhouse*.rpm
systemctl start clickhouse-server
systemctl enable clickhouse-server
clickhouse-client -q "CREATE DATABASE IF NOT EXISTS dws_db;"
tar -zxvf /opt/software/flink-1.18.0-bin-scala_2.12.tgz -C /opt/bigdata/
cat >> /etc/profile << EOF
export FLINK_HOME=/opt/bigdata/flink-1.18.0
export HADOOP_CONF_DIR=/opt/bigdata/hadoop-3.3.6/etc/hadoop
export YARN_CONF_DIR=/opt/bigdata/hadoop-3.3.6/etc/hadoop
export PATH=\$FLINK_HOME/bin:\$PATH
EOF
source /etc/profile

4.2 生产级 flink-conf.yaml

jobmanager.memory.process.size: 4g
taskmanager.memory.process.size: 8g
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
state.backend: rocksdb
state.backend.rocksdb.localdir: /data/flink/rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints
execution.checkpointing.interval: 30000
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
state.backend.incremental: true
high-availability: zookeeper
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.storageDir: hdfs:///flink/ha
high-availability.cluster-id: /flink_prod_cluster
yarn.application-attempts: 3
yarn.maximum-failed-containers: 10
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249

Flink 核心配置参数详解

参数说明
jobmanager.memory.process.size4gJM 进程总内存。管理作业图、调度 Checkpoint
taskmanager.memory.process.size8gTM 进程总内存。执行算子逻辑、维护状态
taskmanager.numberOfTaskSlots4每个 TM 的 slot 数,1 slot = 1 个算子线程
parallelism.default4默认并行度。等于 slot 总数时集群利用率最高
state.backendrocksdbRocksDB 支持增量 Checkpoint 和超大状态,生产唯一选择
state.backend.incrementaltrue增量 Checkpoint,大幅减少 Checkpoint 耗时
execution.checkpointing.interval30000Checkpoint 触发间隔(毫秒),30s 是生产常用值
execution.checkpointing.modeEXACTLY_ONCE精准一次语义,保证每条数据恰好被处理一次
externalized-checkpoint-retentionRETAIN_ON_CANCELLATION生产必开,作业取消后保留 Checkpoint
high-availabilityzookeeperZK 管理 JM Leader 选举,JM 故障时自动切换
yarn.application-attempts3Application 故障后 YARN 自动重试
hdfs dfs -mkdir -p /flink/checkpoints /flink/savepoints /flink/ha /flink/jars /flink/paimon_warehouse
hdfs dfs -chown -R flink:flink /flink
hdfs dfs -chmod -R 755 /flink
hdfs dfs -put /opt/software/*.jar /flink/jars/

五、Paimon Catalog 元数据初始化(仅执行一次)

CREATE CATALOG paimon_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs:///flink/paimon_warehouse',
    'hadoop.conf.dir' = '/opt/bigdata/hadoop-3.3.6/etc/hadoop',
    'table-default-file-format' = 'parquet'
);
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS ods;
CREATE DATABASE IF NOT EXISTS dwd;
CREATE DATABASE IF NOT EXISTS dws;
CREATE DATABASE IF NOT EXISTS ads;

Paimon Catalog 参数说明

参数说明
typeCatalog 类型,固定为 paimon。Paimon 自管理元数据,无需 Hive Metastore
warehouse数据仓库根路径。所有 Paimon 表数据存储在此 HDFS 目录下
hadoop.conf.dirHadoop 配置目录,Paimon 通过此路径读取 HDFS 连接配置
table-default-file-format默认文件格式。Parquet 列式存储,压缩率高、查询快
sql-client.sh embedded -f /opt/bigdata/flink-1.18.0/sql/catalog_init.sql

六、四层实时数仓完整生产SQL

交付规范:文件命名 catalog_init.sql / ods_user_behavior.sql / dwd_user_behavior.sql / dws_behavior_1min.sql / ads_daily_report.sql

6.1 ODS层:MySQL CDC全量+增量同步

USE CATALOG paimon_catalog;
USE ods;
CREATE TABLE IF NOT EXISTS ods_user_behavior (
    user_id STRING, item_id STRING, behavior STRING,
    operate_time TIMESTAMP(3), dt STRING, hr STRING,
    PRIMARY KEY (user_id, operate_time) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc', 'hostname' = '192.168.130.145', 'port' = '3306',
    'username' = 'flink', 'password' = 'YourStrongPassword@2026',
    'database-name' = 'biz_db', 'table-name' = 'user_behavior',
    'server-id' = 5001, 'scan.startup.mode' = 'initial',
    'debezium.snapshot.locking.mode' = 'none'
);
CREATE TABLE IF NOT EXISTS ods_user_behavior_paimon (
    user_id STRING, item_id STRING, behavior STRING,
    operate_time TIMESTAMP(3), dt STRING, hr STRING
) WITH (
    'connector' = 'paimon', 'bucket' = '4', 'compression' = 'zstd',
    'write-mode' = 'append'
);
INSERT INTO ods_user_behavior_paimon
SELECT user_id, item_id, behavior, operate_time,
    DATE_FORMAT(operate_time, 'yyyy-MM-dd') AS dt,
    DATE_FORMAT(operate_time, 'HH') AS hr
FROM ods_user_behavior;

ODS层核心参数

参数说明
server-idCDC 伪随机 ID 范围起点,多个作业区间不能重叠
scan.startup.modeinitial:先全量快照再增量 binlog
debezium.snapshot.locking.modenone:快照时不锁表,避免阻塞业务写入
bucketPaimon 数据桶数,建议设为并行度的 1-2 倍
compressionzstd 压缩率高,查询性能接近 snappy,推荐生产使用
write-modeODS 层用 append:不按主键去重,完整保留原始数据

6.2 DWD层:明细清洗过滤

USE CATALOG paimon_catalog;
USE dwd;
CREATE TABLE IF NOT EXISTS dwd_user_behavior (
    user_id STRING, item_id STRING, behavior STRING,
    operate_time TIMESTAMP(3), dt STRING, hr STRING,
    PRIMARY KEY (user_id, operate_time) NOT ENFORCED
) WITH (
    'connector' = 'paimon', 'bucket' = '4', 'compression' = 'zstd',
    'write-mode' = 'merge'
);
INSERT INTO dwd_user_behavior
SELECT * FROM paimon_catalog.ods.ods_user_behavior_paimon
WHERE user_id IS NOT NULL AND item_id IS NOT NULL
  AND behavior IN ('click','cart','fav','buy');
DWD 写模式merge 模式按主键 upsert 去重,ODS 的 append 不去重,DWD 的 merge 去重——两层职责差异的关键。

6.3 DWS层:分钟级窗口聚合

USE CATALOG paimon_catalog;
USE dws;
CREATE TABLE IF NOT EXISTS dws_behavior_1min (
    dt STRING, hr STRING, minute STRING, behavior STRING,
    uv BIGINT, pv BIGINT, window_end TIMESTAMP(3),
    PRIMARY KEY (dt, hr, minute, behavior) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',
    'url' = 'jdbc:clickhouse://192.168.130.145:8123/dws_db',
    'table-name' = 'behavior_1min',
    'username' = 'default', 'password' = '',
    'sink.buffer-flush.interval' = '1s'
);
INSERT INTO dws_behavior_1min
SELECT dt, hr,
    DATE_FORMAT(TUMBLE_END(operate_time, INTERVAL '1' MINUTE), 'mm') AS minute,
    behavior, COUNT(DISTINCT user_id) AS uv, COUNT(item_id) AS pv,
    TUMBLE_END(operate_time, INTERVAL '1' MINUTE) AS window_end
FROM paimon_catalog.dwd.dwd_user_behavior
GROUP BY TUMBLE(operate_time, INTERVAL '1' MINUTE), dt, hr, behavior;

DWS层核心参数

参数说明
sink.buffer-flush.intervalClickHouse 写入缓冲刷新间隔,1s 平衡延迟与吞吐
TUMBLE(operate_time, INTERVAL '1' MINUTE')1分钟滚动窗口,每条数据恰好属于一个窗口
COUNT(DISTINCT user_id)UV 去重统计,Flink 使用 MinHash/Bundle 优化
ClickHouse端建表DDL(需提前执行):
CREATE TABLE IF NOT EXISTS dws_db.behavior_1min (
    dt String, hr String, minute String, behavior String,
    uv UInt64, pv UInt64, window_end DateTime
) ENGINE = MergeTree()
ORDER BY (dt, hr, minute, behavior)
PARTITION BY dt
TTL window_end + INTERVAL 90 DAY;

ClickHouse 建表参数

参数说明
MergeTree()支持分区、排序、TTL,高吞吐写入
ORDER BY排序键,查询时做主索引跳数
PARTITION BY dt按天分区,方便按日期范围查询和 TTL 过期删除
TTL window_end + INTERVAL 90 DAY数据保留 90 天后自动删除
Watermark说明:MySQL CDC connector 自动生成 watermark。若存在大量迟到数据,可在DWD层添加:WATERMARK FOR operate_time AS operate_time - INTERVAL '5' SECOND

6.4 ADS层:每日业务报表

USE CATALOG paimon_catalog;
USE ads;
CREATE TABLE IF NOT EXISTS dws_source (
    dt STRING, hr STRING, minute STRING, behavior STRING,
    uv BIGINT, pv BIGINT, window_end TIMESTAMP(3)
) WITH (
    'connector' = 'clickhouse',
    'url' = 'jdbc:clickhouse://192.168.130.145:8123/dws_db',
    'table-name' = 'behavior_1min',
    'username' = 'default', 'password' = ''
);
CREATE TABLE IF NOT EXISTS ads_daily_report (
    stat_date STRING, click_uv BIGINT, buy_uv BIGINT, total_pv BIGINT,
    update_time TIMESTAMP(3),
    PRIMARY KEY (stat_date) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.130.145:3306/ads_db?useSSL=false&serverTimezone=Asia/Shanghai',
    'table-name' = 'daily_report',
    'username' = 'flink', 'password' = 'YourStrongPassword@2026',
    'driver' = 'com.mysql.cj.jdbc.Driver'
);
INSERT INTO ads_daily_report
SELECT dt AS stat_date,
    SUM(CASE WHEN behavior = 'click' THEN uv ELSE 0 END) AS click_uv,
    SUM(CASE WHEN behavior = 'buy' THEN uv ELSE 0 END) AS buy_uv,
    SUM(pv) AS total_pv, CURRENT_TIMESTAMP AS update_time
FROM dws_source GROUP BY dt;

七、标准化生产作业提交脚本

#!/bin/bash
FLINK_HOME=/opt/bigdata/flink-1.18.0
HDFS_JAR_PATH=hdfs:///flink/jars
CHECKPOINT_BASE=hdfs:///flink/checkpoints
SQL_FILE=$1; JOB_NAME=$2
if [ -z "$SQL_FILE" ] || [ -z "$JOB_NAME" ]; then echo "用法: $0  "; exit 1; fi
LOG_FILE=${FLINK_HOME}/logs/${JOB_NAME}_$(date +%Y%m%d_%H%M%S).log
mkdir -p ${FLINK_HOME}/logs
nohup ${FLINK_HOME}/bin/flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=4g -Dtaskmanager.memory.process.size=8g \
    -Dparallelism.default=4 -Dstate.checkpoints.dir=${CHECKPOINT_BASE}/${JOB_NAME} \
    -Dexecution.checkpointing.interval=30000 --allow-non-restored-state \
    -j ${HDFS_JAR_PATH}/flink-sql-connector-kafka_2.12-1.18.0.jar \
    -j ${HDFS_JAR_PATH}/flink-sql-connector-mysql-cdc-2.4.0.jar \
    -j ${HDFS_JAR_PATH}/flink-sql-connector-clickhouse-1.18.0.jar \
    -j ${HDFS_JAR_PATH}/flink-sql-connector-paimon-0.8.2.jar \
    -j ${HDFS_JAR_PATH}/mysql-connector-java-8.0.33.jar \
    -f ${FLINK_HOME}/sql/${SQL_FILE} > ${LOG_FILE} 2>&1 &
PID=$!; sleep 3
if kill -0 $PID 2>/dev/null; then echo "【${JOB_NAME}】提交成功"; else echo "【${JOB_NAME}】可能失败,请查日志"; tail -20 ${LOG_FILE}; exit 1; fi
chmod +x /opt/bigdata/flink-1.18.0/shell/submit_job.sh

提交脚本关键参数

参数说明
--allow-non-restored-state允许从不兼容的 Checkpoint 恢复
-j xxx.jar添加额外依赖 Jar,SQL 作业的 Connector 必须通过 -j 手动指定
-f xxx.sql指定 SQL 文件路径
/opt/bigdata/flink-1.18.0/shell/submit_job.sh ods_user_behavior.sql ods_job
/opt/bigdata/flink-1.18.0/shell/submit_job.sh dwd_user_behavior.sql dwd_job
/opt/bigdata/flink-1.18.0/shell/submit_job.sh dws_behavior_1min.sql dws_job
/opt/bigdata/flink-1.18.0/shell/submit_job.sh ads_daily_report.sql ads_job

八、企业级运维手册

yarn application -list | grep Flink
tail -f /opt/bigdata/flink-1.18.0/logs/ods_job_*.log
yarn application -kill <应用ID>
flink savepoint <JobID> hdfs:///flink/savepoints/

8.3 自动清理过期Checkpoint

# 仅清理Checkpoint,禁止自动清理Savepoint!
0 2 * * * hdfs dfs -find /flink/checkpoints -mtime +7 -delete
重要提醒:Savepoint 禁止自动删除。建议按作业ID隔离目录:hdfs:///flink/savepoints/{job_name}/

8.4 关键监控指标

指标告警阈值含义
checkpoint duration> 5min状态过大或HDFS写入慢
checkpoint failed> 0需立即排查
checkpoint size持续增长状态泄漏
busyTimeMsPerSecond> 80%反压风险
records-lag-max持续增长需扩容

九、生产规范与避坑总结

生产上线铁律

9.1 Paimon Compaction 调优

table.exec.async-compact: true
# 'num-sorted-run.compaction-trigger' = '3'
# 'compaction.min.file-num' = '3'
# 'compaction.max.file-num' = '10'

Paimon Compaction 参数

参数说明
table.exec.async-compact异步 Compaction,后台合并小文件,生产必开
num-sorted-run.compaction-trigger触发 Compaction 的 sorted-run 数量阈值
compaction.min/max-file-num单次 Compaction 合并的文件数范围

9.2 版本兼容性矩阵

组件版本说明
Flink1.18.0scala_2.12
Flink CDC2.4.0mysql-cdc connector
Paimon0.8.2paimon-flink-1.18
ClickHouse Connector1.18.0jdbc:clickhouse:// 前缀
Hadoop3.3.6依赖基座
MySQL8.0binlog_format=ROW